# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
 
# This code sample demonstrates using the low-level generated client for Python.
# Make sure you provide your Project, Dataset, and Table details below in line 167.
 
import datetime
import decimal
 
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
 
##################################################################################
# To update your protocol buffer definition with your sample_data.proto file run:
#
#   protoc --python_out=. sample_data.proto
#
##################################################################################
 
import sample_data_pb2
 
 
def append_rows_proto2(project_id: str, dataset_id: str, table_id: str):
    # Create a write stream, write some sample data, and commit the stream
    write_client = bigquery_storage_v1.BigQueryWriteClient()
    parent = write_client.table_path(project_id, dataset_id, table_id)
    write_stream = types.WriteStream()
 
    # When creating the stream, choose the type. Use the PENDING type to wait
    # until the stream is committed before it is visible. See:
    # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type
    write_stream.type_ = types.WriteStream.Type.PENDING
    write_stream = write_client.create_write_stream(
        parent=parent, write_stream=write_stream
    )
    stream_name = write_stream.name
 
    # Create a template with fields needed for the first request.
    request_template = types.AppendRowsRequest()
 
    # The initial request must contain the stream name.
    request_template.write_stream = stream_name
 
    # So that BigQuery knows how to parse the serialized_rows, generate a
    # protocol buffer representation of your message descriptor.
    proto_schema = types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data
 
    # Some stream types support an unbounded number of requests. Construct an
    # AppendRowsStream to send an arbitrary number of requests to a stream.
    append_rows_stream = writer.AppendRowsStream(write_client, request_template)
 
    # Create a batch of row data by appending proto2 serialized bytes to the
    # serialized_rows repeated field.
    proto_rows = types.ProtoRows()
 
    row = sample_data_pb2.SampleData()
    row.creationTime = "2020-10-22 21:12:44 UTC"
    row.id = "fZTLIbppjFXOmy"
    row.name = "User Login"
    row.severity = "info"
    row.source = "Login Server"
    row.destination = "www.acmelogin.com"
    row.accessGranted = True
    row.labels = '{"property":"Authentication Success","threatRating":0.1,"type":"login","method":"bqStorageAPI"}'
    proto_rows.serialized_rows.append(row.SerializeToString())
 
    row = sample_data_pb2.SampleData()
    row.creationTime = "2020-10-22 21:28:14 UTC"
    row.id = "aOShYdrZEOdUfC"
    row.name = "User Login"
    row.severity = "warning"
    row.accessGranted = False
    row.labels = '{"property":"Authentication Success","threatRating":null,"method":"bqStorageAPI"}'
    proto_rows.serialized_rows.append(row.SerializeToString())
 
    # Set an offset to allow resuming this stream if the connection breaks.
    # Keep track of which requests the server has acknowledged and resume the
    # stream at the first non-acknowledged message. If the server has already
    # processed a message with that offset, it will return an ALREADY_EXISTS
    # error, which can be safely ignored.
    #
    # The first request must always have an offset of 0.
    request = types.AppendRowsRequest()
    request.offset = 0
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows
    request.proto_rows = proto_data
 
    response_future_1 = append_rows_stream.send(request)
 
    # Second batch
    proto_rows = types.ProtoRows()
 
    row = sample_data_pb2.SampleData()
    row.creationTime = "2020-10-22 21:57:14 UTC"
    row.id = "VNxAfWbpdPimKG"
    row.name = "User Logout"
    row.severity = "info"
    row.source = "Login Server"
    row.destination = "www.acmelogin.com"
    row.accessGranted = True
    row.labels = '{"property":"Authentication Success","threatRating":0.1,"type":"log out","method":"bqStorageAPI"}'
    proto_rows.serialized_rows.append(row.SerializeToString())
 
    row = sample_data_pb2.SampleData()
    row.creationTime = "2020-10-22 21:24:14 UTC"
    row.id = "vZWRZbgZcCEalB"
    row.name = "User Login"
    row.severity = "critical"
    row.accessGranted = False
    row.labels = '{"property":"Authentication Success","threatRating":0.5,"type":"login failure","c":[10, 20,"test"],"method":"bqStorageAPI"}'
    proto_rows.serialized_rows.append(row.SerializeToString())
 
    # Since this is the second request, you only need to include the row data.
    # The name of the stream and protocol buffers DESCRIPTOR is only needed in
    # the first request.
    request = types.AppendRowsRequest()
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows
    request.proto_rows = proto_data
 
    # Offset must equal the number of rows that were previously sent.
    request.offset = 2
 
    response_future_2 = append_rows_stream.send(request)
 
    # All three requests are in-flight, wait for them to finish being processed
    # before finalizing the stream.
    print(response_future_1.result())
    print(response_future_2.result())
 
    # Shutdown background threads and close the streaming connection.
    append_rows_stream.close()
 
    # A PENDING type stream must be "finalized" before being committed. No new
    # records can be written to the stream after this method has been called.
    write_client.finalize_write_stream(name=write_stream.name)
 
    # Commit the stream you created earlier.
    batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
    batch_commit_write_streams_request.parent = parent
    batch_commit_write_streams_request.write_streams = [write_stream.name]
    write_client.batch_commit_write_streams(batch_commit_write_streams_request)
 
    print(f"Writes to stream: '{write_stream.name}' have been committed.")
 
if __name__ == '__main__':
    append_rows_proto2('<supply_your_project_id_here>', 'json_example', 'streaming_events') # Change this to your specific BigQuery project, dataset, table details